IO模型

1. IO复用

进程需要这样一种功能:内核一旦发现进程指定的一个或多个IO条件(事件)就绪(输入准备好被读取或者输出准备好被输出),它通知进程,这个就叫做I/O复用。

IO复用的典型使用场景

  • 客户处理多个描述符号(通常是交互式输入和网络套接字)时候,必须使用I/O复用,例如实际的网络聊天,进程既要等待用户的输入,又要处理来自网络套接字的输入,采用I/O复用可以大幅度提高并发度和节省资源;

  • TCP服务器既要处理监听套接字,又要处理已经连接的套接字,这时候需要I/O复用以提高并发性能;实际上常见的并发连接的客户数多,但是每个连接的任务轻(CPU时间短)比如多人在线聊天室;连接并发数多,但是每个连接不都是长期处于活跃状态,可能是偶尔I/O一些数据,这时候需要I/O复用,让系统别被这些Lazy的客户端拖垮;

  • 如果一个服务器需要同时处理多个服务或协议,一般使用IO复用;

其实,不难看出I/O复用的目的是为了让计算机的资源得到尽可能最大化的利用,提高并行化程度达到这种目的的一种有效途径。

I/O复用在某种程度上和操作系统的中断处理机制何其的相似。

操作系统作为计算机的核心软件,管理着众多的硬件设备,具有极大的并发性能,我们可以同时听音乐,同时拷文件,同时跑程序等等;如此高的并发能力得益与中断机制给操作系统带来的异步能力,当某个硬件设备特定的事件发生时(比如磁盘就绪),就引发一个硬件中断,操作系统能捕获到这些中断,将当前运行的进程挂起,然后转而执行该中断对应的中断处理程序。

这种机制有点贪心的思想,操作系统总是尽可能的少等待,有活就干,没活就离开;这种机制中,众多硬件设备就类似与IO复用里面的众多Lazy的客户,他们需要与服务器(这里是操作系统)保持连接(做到随叫随到),但是又总是需要操作系统。中断是操作系统运行的动力源泉,这是一个典型的异步模型。I/O复用依赖于内核提供的异步能力,其抽象程度与操作系统和硬件设备的模型相似,这样能充分利用计算机的资源(软件充分利用操作系统,操作系统充分利用硬件)。

2. I/O模型

  • 阻塞IO
  • 非阻塞IO
  • IO复用
  • 信号驱动IO
  • 异步IO

应用程序典型的I/O包含两个不同的阶段:

以输入为例

  1. IO请求就绪阶段 应用程序向内核发出请求,检查硬件设备是否准备好数据(例如等待数据从网络到达、用户键盘敲击);若硬件设备就绪,产生中断;
  2. IO实际读写阶段 内核捕获处理中断,然后内核空间将数据拷贝到用户的进程空间中,CPU切换到用户态;

这两个阶段的产生主要是由于计算机目前的体系结构决定的(最顶层用户程序运行在用户态,中间层内核工作在内核态,最底层是硬件);这两个阶段都有可能产生阻塞,在内核与硬件之间,应付这种阻塞采用了基于事件的中断机制以提高并发性;而在内核与用户进程之间,异步IO机制也将为用户进程并发程度提高带来曙光。

2.1 阻塞IO

最普遍的IO模型都是阻塞模型,这种模型在IO的的第一阶段(向内核发出I/O请求的系统调用)时,被阻塞直到I/O就绪,再执行实际I/O操作

2.2 非阻塞IO

进程通知内核,进程所发起的IO请求I/O未就绪的条件下,也不能把进程阻塞(挂起),而是返回一个错误;该模型可能需要进程不断的去轮询某个FD是否IO就绪,这样的轮询模型比较少见,轮询会额外的浪费大量的CPU资源。

2.3 IO复用

该模型本质上属于非阻塞模型I/O,常见的是selectpoll机制。对于普通的非阻塞模型,一个进程或者线程始终轮询同一个FD,这样效率较低,特别是对于同时需要处理很多FD的程序;

而IO复用模型,可以让进程或者线程同时管理(轮询)多个FD,以select为例,select能够管理许多FD上的不同事件,select调用在所有FD都没有IO事件(就绪、可读、可写等)产生时会阻塞,虽然select调用会阻塞,但是其管理轮询的FD不是阻塞的,因此该模型依然算是非阻塞IO模型

该模型通常和阻塞IO模型+多线程作对比,在典型的socket服务器端,该模型和阻塞IO模型+多线程可以实现相同的功能,大多数情况下效果性能也不会差太离谱;

但是考虑在某些特定的场景下,例如客户端连接数目众多,但是每个连接今次进行IO次数比较多,IO的数据比较少(典型的就是多人在线聊天,大量的客户端同服务器保持连接,但是每一个连接“走走停停”,隔一会来几个字节的数据),在阻塞IO模型+多线程模型中,每一个连接对应于一个线程,大量的长连接会很快消耗掉计算机中(也可能是线程池)中所有线程资源,从而拒绝了额外的新的客户连接请求,但是此时计算机的CPU却是闲置的,因为很多线程其实是在阻塞状态,因此这种场景下,该模型没有很好的提高系统的并发性;

而采用I/O复用模型,用一个或多个线程去select或者poll检查多个连接上的IO事件,当IO事件(可读、可写等)发生,就利用新开线程池中的线程来处理IO,由于每一次的IO的任务很轻,并且IO数据已经来到了内核或者进程的缓冲区,这样新开的线程很快就会被线程池回收,这样当新来的客户连接请求进来,系统仍然有足够的线程资源来处理请求,极大的提高了该场景下系统的吞吐量,IO复用模型要远优于阻塞IO模型+多线程

实际应用中,辨证的选择IO复用模型阻塞IO模型+多线程

2.4 信号驱动IO

进程不会阻塞在第一阶段,当IO就绪时,内核会通过信号的形式通知进程;

2.5 异步IO

进程在整个IO过程(IO请求和IO读写阶段)都不会阻塞,而是在IO完成后由内核通知进程;

2.6 同步IO与异步IO

在某些术语中,阻塞和非阻塞用于区分IO的第一个阶段,即IO的请求阶段
同步和异步用于区分IO的第二个阶段,即IO的读写阶段

因此,按照这个来重新分类上述五种IO模型,就可以分为

  • 同步IO

    • 阻塞IO (最普遍的同步阻塞BIO)
    • 非阻塞IO (Java NIO, 需要轮询,不停的轮询某个FD是否就绪)
    • IO复用 (Java NIO的Select机制,同时监听多个FD,虽然单个FD不阻塞,但是当所有FD都没有就绪时,select调用会阻塞;一个阻塞换来多个不阻塞)
    • 信号驱动IO (同同步非阻塞IO,不用轮询,而是直接接收内核发来的IO信号)
  • 异步IO (Java AIO, IO的两个阶段都不会阻塞,进程只需要发起IO请求,在IO完成两个阶段后,由内核负责通知进程,期间进程都不会挂起)

2.7 5种IO模型的Java示例

  • (1)阻塞IO + 多线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55


package iomodel;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class BIODemo {

public static void main(String[] args) {
// TODO Auto-generated method stub
Executor pools = Executors.newFixedThreadPool(5);
try {
ServerSocket sS = new ServerSocket(8888);
for (;;) {
Socket s = sS.accept();//blocking
pools.execute(new BIOWorker(s));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

class BIOWorker implements Runnable {
Socket client = null;

public BIOWorker(Socket client) {
super();
this.client = client;
}

@Override
public void run() {
// TODO Auto-generated method stub
try {
BufferedReader is = new BufferedReader(new InputStreamReader(client.getInputStream()));
String line = null;
while ((line = is.readLine()) != null)//blocking
System.out.println(line);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}
  • (2)非阻塞IO模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package iomodel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
public class NBIODemo {

public static void main(String[] args) {
// TODO Auto-generated method stub
try {
Charset ascii = Charset.forName("us-ascii");
CharsetDecoder decoder = ascii.newDecoder();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ServerSocket ss = ssc.socket();
ss.bind(new InetSocketAddress("localhost", 8888));

SocketChannel sC = null;

while ((sC = ssc.accept()) == null)
System.out.println("i am pooling for new connection!");// poll until new connection was established.non-blocking
sC.configureBlocking(false);

ByteBuffer buffer = ByteBuffer.allocate(1024);

while (true) {//pool for reading
int r = sC.read(buffer);//non-blocking
System.out.println("i am pooling for new data!");
if (r > 0) {
buffer.flip();
System.err.println("recv " + r +" bytes from " + sC);
System.err.print(decoder.decode(buffer));
buffer.compact();
}
}

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}
  • (3)I/O复用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package iomodel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class MultiplexingIODemo {

public static void main(String[] args) {
// TODO Auto-generated method stub
Executor pool = Executors.newFixedThreadPool(5);
try {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);// listening socket channel is
// non-blocking for accept()

ServerSocket ss = ssc.socket();
ss.bind(new InetSocketAddress("localhost", 8888));

ssc.register(selector, SelectionKey.OP_ACCEPT);

for (;;) {

// System.out.println("I am selecting..");
selector.select();// may block until there occurred at least one
// IO event.

Set<SelectionKey> selectKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectKeys.iterator();

while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
ServerSocketChannel ssc1 = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc1.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
iterator.remove();
System.out.println("Got connection from " + sc);
} else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
SocketChannel sc = (SocketChannel) key.channel();
if (key.attachment() == null) {
ByteBuffer buff = ByteBuffer.allocate(1024);
key.attach(buff);
System.out.println("first recv data from " + sc);
}
pool.execute(new IOWorker(sc, (ByteBuffer) key.attachment()));
iterator.remove();
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

class IOWorker implements Runnable {
ByteBuffer buffer = null;
SocketChannel sC = null;
Charset gb2312 = Charset.forName("gb2312");
CharsetDecoder decoder = gb2312.newDecoder();

public IOWorker(SocketChannel sC, ByteBuffer buffer) {
super();
this.sC = sC;
this.buffer = buffer;

}

@Override
public void run() {
// TODO Auto-generated method stub
int r;
try {
r = this.sC.read(buffer);
if (r > 0) {
System.err.println("recv " + r + " new bytes from " + sC);
buffer.flip();
// get first byte
// decode bytes stream
for (; buffer.hasRemaining();) {
byte fisrt = buffer.get(0);
if (Byte.toUnsignedInt(fisrt) <= 127) {// 高字节的大小判断
fisrt = buffer.get();
System.err.println("[to " + sC.socket().getPort() + " ] "+ (char) fisrt);
} else if (buffer.remaining() >= 2) {
byte[] b = new byte[2];
b[0] = buffer.get();
b[1] = buffer.get();
String s = new String(b, "gbk");
System.err.println("[to " + sC.socket().getPort() + " ] " + s);
} else
break;
}
buffer.compact();
}

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}
  • (4)异步I/O
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package iomodel;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class AIODemo {

public static void main(String[] args) {
// TODO Auto-generated method stub
try {
AsynchronousFileChannel aFC = AsynchronousFileChannel.open(Paths.get("a.txt"), StandardOpenOption.READ);
System.out.println("main " + Thread.currentThread().getId());
ByteBuffer buffer = ByteBuffer.allocate((int) new File("a.txt").length());
Future<Integer> result = aFC.read(buffer, 0);
while (!result.isDone()) {
System.out.println("i am doing other useful work...");
}
buffer.flip();
System.out.println(result.get());

System.out.println("read is done");

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

}

class FileReadHandler implements CompletionHandler<Integer, AsynchronousFileChannel> {

@Override
public void completed(Integer result, AsynchronousFileChannel attachment) {
System.out.println("new connection from " + result);
}

@Override
public void failed(Throwable exc, AsynchronousFileChannel attachment) {
// TODO Auto-generated method stub

}

}

3. 总结

  • 几种IO模型的优缺点,模型的阻塞、非阻塞以及同步、异步指的是什么?
  • select模型与阻塞+多线程模型的区别以及使用场景;

4. References

[1] 史蒂文斯,芬纳,鲁道夫.UNIX网络编程卷一[M].北京:人民邮电出版社,2015:122-148